package onion.mqtt.server;


import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.timeout.IdleStateHandler;

import java.io.File;
import java.util.concurrent.TimeUnit;

/**
 * @author Mr, Lu
 * @developmentTeam 浙江允泽信息科技有限公司
 * @createTime 2023/12/12
 */
public class MqttServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    private final MqttServerConfig config;
    private final MqttServerInboundHandler handler;

    public MqttServerChannelInitializer(MqttServerConfig config, MqttServerInboundHandler handler) {
        this.config = config;
        this.handler = handler;
    }

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        // 添加心跳检测处理器
        pipeline.addLast("idle", new IdleStateHandler(
                config.getKeepAlive(),
                config.getKeepAlive(),
                config.getKeepAlive(),
                TimeUnit.SECONDS
        ));
        // SSL连接处理器
        if (config.getSsl().isEnabled()) {
            // ssl
            SslContext sslContext = SslContextBuilder.forServer(
                            new File(config.getSsl().getServerCertFile()),
                            new File(config.getSsl().getKeyFile()))
                    .trustManager(new File(config.getSsl().getCaCertFile())).build();
            pipeline.addLast("ssl", sslContext.newHandler(channel.alloc()));
        }
        // 编解码处理器
        pipeline.addLast("encoder", MqttEncoder.INSTANCE);
        pipeline.addLast("decoder", new MqttDecoder(config.getMaxBytesInMessage()));
        // 消息处理器
        pipeline.addLast("channelHandler",handler);
    }
}
